Skip to content

Governance Workflows: scheduleRunId correlation, batch trigger refactor, stage entity enrichment#27081

Open
yan-3005 wants to merge 20 commits intoram/inclusive-gatewayfrom
ram/workflow-instance-states-impl
Open

Governance Workflows: scheduleRunId correlation, batch trigger refactor, stage entity enrichment#27081
yan-3005 wants to merge 20 commits intoram/inclusive-gatewayfrom
ram/workflow-instance-states-impl

Conversation

@yan-3005
Copy link
Copy Markdown
Contributor

@yan-3005 yan-3005 commented Apr 6, 2026

Summary

  • Batch trigger refactor: removes MultiInstanceLoopCharacteristics from PeriodicBatchEntityTrigger; one CallActivity per fetch iteration passes the full entityList to the main workflow. Inclusive gateways inside MainWorkflow handle per-entity fan-out within a single Flowable execution.
  • scheduleRunId correlation: generates one UUID per trigger fire, propagated into every spawned WorkflowInstance and WorkflowInstanceState row — allows grouping all rows from one periodic trigger fire into a single scheduled run.
  • Stage entity enrichment: stage.entityList resolved at stage START using the node's inputNamespaceMap (same logic nodes use at runtime), so each stage records exactly the entity list it received via routing (entityList, gold_entityList, true_entityList, etc.). stage.updatedBy set at stage END from node-namespaced variable.
  • Instance enrichment: WorkflowInstance gains top-level entityList (from global_entityList at start) and updatedBy (scanned from variables at end).
  • DB migrations (1.14.0): generated columns + indexes for scheduleRunId on both workflow_instance_time_series and workflow_instance_state_time_series for MySQL and Postgres.
  • API: scheduleRunId query param on WorkflowInstanceResource and WorkflowInstanceStateResource for batch grouping queries.

Key design decisions

  • WorkflowScheduleRunIdSetterListener fires before WorkflowInstanceListener (ordering enforced in attachScheduleRunIdListener) so the trigger's own WorkflowInstance row also carries the scheduleRunId.
  • stage.entityList is fetched at stage start (not end) because the routing has already resolved which branch this execution is on — using getEntityList(inputNamespaceMap, varHandler) with the live DelegateExecution.
  • NewStageRequest record replaces the 5-param method signature cap.
  • WorkflowVariableHandler.getEntityListFromVariables static overload allows map-based resolution without a live DelegateExecution.

Test plan

  • WorkflowVariableHandlerTest — 6 cases: plain entityList, banded lists (gold_, silver_, true_), key-not-found, empty map
  • WorkflowScheduleRunIdReaderTest — 5 cases: UUID object, UUID-as-string, namespaced fallback, null when absent, plain takes precedence
  • WorkflowScheduleRunIdSetterListenerTest — sets on first call, idempotent, unique IDs per process
  • PeriodicBatchEntityTriggerTest — no multi-instance loop, no inherited business key, entityList + scheduleRunId IOParameters, process structure
  • TriggerFactoryTest — factory creates correct trigger type

…or, stage entity enrichment

- Remove MultiInstanceLoopCharacteristics from PeriodicBatchEntityTrigger; one CallActivity
  per fetch iteration passes the full entityList to the main workflow
- Add scheduleRunId UUID generated once per trigger fire, propagated into every spawned
  WorkflowInstance and WorkflowInstanceState row for batch grouping
- Add WorkflowScheduleRunIdSetterListener (idempotent, fires before WorkflowInstanceListener)
  and WorkflowScheduleRunIdReader shared utility
- Add entityList and updatedBy to WorkflowInstance and WorkflowInstanceState schemas;
  stage.entityList is resolved at stage START using the node's inputNamespaceMap so each
  stage records exactly the entity list it received via routing (global, gold, true, etc.)
- Add getEntityListFromVariables static overload to WorkflowVariableHandler for map-based
  resolution without a live DelegateExecution
- Add NewStageRequest record to carry stage creation params (avoids 5-param cap)
- Add scheduleRunId generated columns + indexes to 1.14.0 MySQL and Postgres migrations
- Add listByScheduleRunId to WorkflowInstance and WorkflowInstanceState DAOs
- Add scheduleRunId query param to WorkflowInstanceResource and WorkflowInstanceStateResource
- Add unit tests: WorkflowVariableHandlerTest, WorkflowScheduleRunIdReaderTest,
  WorkflowScheduleRunIdSetterListenerTest, PeriodicBatchEntityTriggerTest, TriggerFactoryTest
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 6, 2026

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

@yan-3005 yan-3005 self-assigned this Apr 6, 2026
@yan-3005 yan-3005 added safe to test Add this label to run secure Github workflows on PRs backend labels Apr 6, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 6, 2026

✅ TypeScript Types Auto-Updated

The generated TypeScript types have been automatically updated based on JSON schema changes in this PR.

@github-actions github-actions bot requested a review from a team as a code owner April 6, 2026 11:07
…PR comments

- WorkflowVariableHandler: add getUpdatedByFromVariables() to resolve
  updatedBy via inputNamespaceMap (mirrors getEntityListFromVariables)
- WorkflowInstanceStageListener: factor out getStageInputNamespaceMap(),
  add resolveStageUpdatedBy(), pass resolved value to updateStage();
  narrow catch(Exception) to EntityNotFoundException
- WorkflowInstanceStateRepository.updateStage: accept resolved updatedBy
  as parameter, drop broken extractUpdatedBy() that used stage.getName()
  as namespace (never matched real values)
- WorkflowInstanceRepository: replace non-deterministic extractUpdatedBy()
  (HashMap scan) with extractUpdatedByFromStates() that picks the latest
  stage's correctly-resolved updatedBy
- WorkflowFailureListener: read scheduleRunId from RuntimeService before
  building NewStageRequest so failure stages carry the correlation id;
  import NewStageRequest directly instead of FQN
- WorkflowScheduleRunIdReader.toUuid: catch IllegalArgumentException,
  log warning, return null instead of throwing on malformed input
- WorkflowInstanceResource/StateResource: route scheduleRunId queries
  directly through listByScheduleRunId() DAO method instead of silently
  ignoring the filter parameter via ListFilter
Delegate to WorkflowScheduleRunIdReader.toUuid() (which handles both UUID
and String) instead of instanceof UUID checks that silently drop String
values Flowable may have serialized during persistence.
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 6, 2026

Jest test Coverage

UI tests summary

Lines Statements Branches Functions
Coverage: 63%
63.83% (59800/93684) 43.66% (31344/71789) 46.71% (9418/20161)

…gination

Add getScheduleRunIdCondition() to ListFilter following the getServerIdCondition()
pattern so scheduleRunId composes correctly with startTs/endTs/limit/latest.
Remove the special-case bypass branch in both resource list handlers and delete
the now-dead listByScheduleRunId helpers from the repositories and CollectionDAO.
yan-3005 and others added 4 commits April 15, 2026 23:31
…workflow-instance-states-impl

# Conflicts:
#	openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchChangeEventsImpl.java
@github-actions
Copy link
Copy Markdown
Contributor

The Java checkstyle failed.

Please run mvn spotless:apply in the root of your repository and commit the changes to this PR.
You can also use pre-commit to automate the Java code formatting.

You can install the pre-commit hooks with make install_test precommit_install.


String stage =
Optional.ofNullable(execution.getCurrentActivityId()).orElse(workflowDefinitionName);
WorkflowDefinition workflowDefinition = fetchWorkflowDefinition(workflowDefinitionName);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance: fetchWorkflowDefinition called twice per stage event (start + end)

In WorkflowInstanceStageListener, fetchWorkflowDefinition is called in both addNewStage (stage START) and updateStage (stage END), resulting in two DB lookups per stage lifecycle. The WorkflowDefinition doesn't change between start and end, so the result from the START call could be cached on the execution (e.g., as a transient variable or instance field) and reused at END.

This was flagged as resolved in previous findings for the duplicate lookup within a single event, but a new duplicate was introduced across start/end events for the same stage.

Suggested fix:

Store the WorkflowDefinition as a process-scoped variable or
cache it in a local variable keyed by workflowDefinitionName,
so the END event reuses the definition fetched at START.
For example, store it in varHandler at START:
  varHandler.setNodeVariable("cachedWorkflowDef", workflowDefinition);
and retrieve it at END before falling back to DB.

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

@gitar-bot
Copy link
Copy Markdown

gitar-bot bot commented Apr 15, 2026

Code Review ⚠️ Changes requested 5 resolved / 6 findings

Refactors governance workflows to support scheduleRunId correlation and stage enrichment, while resolving several data handling and type-safety issues. However, the WorkflowInstanceStageListener currently performs redundant database lookups for workflow definitions during the stage lifecycle.

⚠️ Performance: fetchWorkflowDefinition called twice per stage event (start + end)

📄 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java:181 📄 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java:276

In WorkflowInstanceStageListener, fetchWorkflowDefinition is called in both addNewStage (stage START) and updateStage (stage END), resulting in two DB lookups per stage lifecycle. The WorkflowDefinition doesn't change between start and end, so the result from the START call could be cached on the execution (e.g., as a transient variable or instance field) and reused at END.

This was flagged as resolved in previous findings for the duplicate lookup within a single event, but a new duplicate was introduced across start/end events for the same stage.

Suggested fix
Store the WorkflowDefinition as a process-scoped variable or
cache it in a local variable keyed by workflowDefinitionName,
so the END event reuses the definition fetched at START.
For example, store it in varHandler at START:
  varHandler.setNodeVariable("cachedWorkflowDef", workflowDefinition);
and retrieve it at END before falling back to DB.
✅ 5 resolved
Bug: scheduleRunId filter is silently ignored — API param has no effect

📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowInstanceResource.java:133-135 📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowInstanceStateResource.java:119-121 📄 openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java:9443-9446 📄 openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java:9468-9471
The scheduleRunId query parameter added to WorkflowInstanceResource (line 134) and WorkflowInstanceStateResource (line 120) is stored in the ListFilter via addQueryParam(), but ListFilter.getCondition() has no getScheduleRunIdCondition() implementation. The standard repository.list() path builds its WHERE clause from filter.getCondition(), so the parameter is completely ignored at query time. The dedicated listByScheduleRunId() DAO methods exist but are never called from the list endpoint.

This means the new API feature advertised in the PR (filtering by scheduleRunId) does not actually work — all queries return unfiltered results regardless of the parameter value.

Bug: WorkflowFailureListener passes null scheduleRunId for failure stages

📄 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowFailureListener.java:216-223
In WorkflowFailureListener.addFailureStage() (lines 216-223), the NewStageRequest is constructed with null for scheduleRunId. The listener receives a FlowableEvent (not a DelegateExecution), so WorkflowScheduleRunIdReader.readFrom() can't be called directly. However, RuntimeService is already available in the listener (used elsewhere in the class). Failure stages created this way won't appear in schedule-run-grouped queries, breaking the correlation guarantee that all stages from one trigger fire share a scheduleRunId.

Edge Case: toUuid throws unchecked exception on malformed variable value

📄 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowScheduleRunIdReader.java:24-26
WorkflowScheduleRunIdReader.toUuid() (line 25) calls UUID.fromString(value.toString()) for any non-UUID object. If the variable holds a corrupted or unexpected value (e.g., an integer, empty string), this throws IllegalArgumentException. While the outer try-catch in callers (WorkflowInstanceStageListener, WorkflowInstanceListener) would catch it, the error message would be opaque — a UUID parse failure rather than a clear 'invalid scheduleRunId' message.

Bug: readScheduleRunIdFromProcess ignores String-typed schedule run IDs

📄 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowFailureListener.java:251-265 📄 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowScheduleRunIdReader.java:26-39
The new readScheduleRunIdFromProcess in WorkflowFailureListener only handles instanceof UUID checks (lines 256, 263), but Flowable can store the variable as a String — which WorkflowScheduleRunIdReader.toUuid() already handles via UUID.fromString(). When the variable is a String, both the plain and namespaced lookups silently fall through, and the failure stage is recorded with a null scheduleRunId, breaking batch-grouping queries for failed workflow instances.

This is the exact scenario the existing WorkflowScheduleRunIdReader.readFrom() was designed for — it delegates to toUuid() which handles both UUID and String types.

Performance: Duplicate WorkflowDefinition DB lookup per stage start event

📄 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java:186-196 📄 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java:207-218 📄 openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceStateRepository.java:96-101
Each stage START event now triggers two separate DB lookups for the same WorkflowDefinition:

  1. resolveStageEntityListgetStageInputNamespaceMapworkflowDefinitionRepository.getByNameForStageProcessing()
  2. addNewStageToInstanceworkflowDefinitionRepository.getByNameForStageProcessing()

Similarly, each stage END event calls resolveStageUpdatedBygetStageInputNamespaceMap which fetches the definition again (though updateStage in the repository doesn't re-fetch).

For a workflow with N stages, this adds N unnecessary DB roundtrips. The WorkflowDefinition should be fetched once and reused.

🤖 Prompt for agents
Code Review: Refactors governance workflows to support `scheduleRunId` correlation and stage enrichment, while resolving several data handling and type-safety issues. However, the `WorkflowInstanceStageListener` currently performs redundant database lookups for workflow definitions during the stage lifecycle.

1. ⚠️ Performance: fetchWorkflowDefinition called twice per stage event (start + end)
   Files: openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java:181, openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java:276

   In `WorkflowInstanceStageListener`, `fetchWorkflowDefinition` is called in both `addNewStage` (stage START) and `updateStage` (stage END), resulting in two DB lookups per stage lifecycle. The `WorkflowDefinition` doesn't change between start and end, so the result from the START call could be cached on the execution (e.g., as a transient variable or instance field) and reused at END.
   
   This was flagged as resolved in previous findings for the *duplicate* lookup within a single event, but a new duplicate was introduced across start/end events for the same stage.

   Suggested fix:
   Store the WorkflowDefinition as a process-scoped variable or
   cache it in a local variable keyed by workflowDefinitionName,
   so the END event reuses the definition fetched at START.
   For example, store it in varHandler at START:
     varHandler.setNodeVariable("cachedWorkflowDef", workflowDefinition);
   and retrieve it at END before falling back to DB.

Options

Display: compact → Showing less information.

Comment with these commands to change:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

@sonarqubecloud
Copy link
Copy Markdown

@sonarqubecloud
Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backend safe to test Add this label to run secure Github workflows on PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant